跳到主要内容

Java 并发编程 AQS 同步队列

AQS 的概念

AQS 是 AbstractQueuedSynchronizer 的简称,即抽象队列同步器,从字面意思上理解:

  • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
  • 队列:使用先进先出(FIFO)队列存储数据;
  • 同步:实现了同步的功能。

AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器,ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。

AQS 核心思想就是,如果请求的资源空闲,就直接让当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

补充:CLH(Craig Landin and Hagersten,这是三个人名)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系

通俗的理解,AQS 就是一个队列,只有当某个线程用完资源了,才能让下一个线程进来使用,如下

image.png

这里的每一个 Node 里面就是一个线程,而这里的 status 就是共享资源锁,每次这个 head 指向第一个 Node,只有它才能运行,其它的线程都需要等待,这个 tail 指向最后一个线程

说白了,这个 AQS 就是一个队列的抽象类,公平锁,非公平锁都是它的一种具体实现方式

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable

AQS 定义两种资源共享方式

1、Exclusive(独占):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:

  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的

2、Share(共享):多个线程可同时执行,如 CountDownLatch、Semaphore、 CyclicBarrier、ReadWriteLock。

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。

AQS 底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state + 1(可重入锁)。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。

当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS(Compare and Swap)减 1。等到所有子线程都执行完后(即 state=0),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

申明自己的 AQS 类(原理向)

一般要自己写一个 AQS 队列,需要继承上面的那个 AbstractQueuedSynchronizer 抽象类,这里为了方便就直接写没有去继承了

基础环境搭建

内置一个双向链表

  • 申明一个内部类 Node,每个 Node 包含指向上一个元素和下一个元素的指针以及一个 Thread对象
  • 记录链表的头元素和尾元素
  • 在当前类中记录正在占有锁的线程,设置一个变量表示锁
public class MySimpleAQS {

// 因为是共享的变量,所以要保证可见性
private volatile Node head; /* 表示头元素 */
private volatile Node tail; /* 表示尾元素 */
private volatile int status; /* 资源状态(一把锁)1:表示锁被占用 0:表示锁空闲 */
private Thread exclusiveOwnerThread; /* 当前占有锁的线程对象 */

public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}

public int getStatus() {
return status;
}

public void setStatus(int status) {
this.status = status;
}

// 取得 Unsafe 后面需要直接操作 CAS
private static Unsafe unsafe;
private static long statusOffset; /* 记录资源状态的偏移量 */
private static long tailOffset; /* 记录队尾元素的偏移量 */
private static long headOffset; /* 记录队头元素的偏移量 */
private static long threadStatusOffset; /* 记录 Node 状态的偏移量 */

static {
// 因为这个 theUnsafe 是用的 private 修饰的,所以需要使用 setAccessible 打开
// 取得静态对象是用 null
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
statusOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("status"));
tailOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("tail"));
headOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("head"));
threadStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("threadStatus"));
} catch (IllegalAccessException | NoSuchFieldException e) {
e.printStackTrace();
}
}

static class Node {
Node prev; /* 指向上一个元素 */
Node next; /* 指向下一个元素 */
Thread thread; /* 当前元素维护的线程 */
int threadStatus; /* 状态(1:运行 2:等待 3:取消) */

// 定义几个状态常量
static final int DEFAULT = 0;
static final int WAIT = 1;
static final int CANCELLED = -1;

// 构造方法
public Node() {
}

public Node(Thread thread) {
this.thread = thread;
}

public Node(Thread thread, int threadStatus) {
this(thread);
this.threadStatus = threadStatus;
}
}
}

acquire 方法获取锁

获取锁的几种情况:

  1. 第一个线程,不需要排队,不需要等待,不需要队列,直接获取锁
  2. 锁被占用,需要排队,但是当前没有队列,需要新建队列,并且入队
  3. 锁被占用,需要排队,而且队列中已经有其他线程,则直接入队等待

对上述的三种情况创建其他三个方法:

  1. tryAcquire
  2. acquireQueued
  3. addWaiter

这里先把这个 acquire 的几个方法模板创建出来,之后再慢慢填充,主要是需要看这个 acquire 方法的逻辑,学习并发可以知道,锁会随时释放,所以 acquire 方法把判断是否取得锁的操作拆成了多个判断来应对这种突然就取得锁的情况

如下可以发现 tryAcquire 和 acquireQueued 是分开的判断,首先通过 tryAcquire 判断取不到锁,会执行 acquireQueued 方法,这个 acquireQueued 方法主要处理线程入队,当前线程入队的过程中,如果占用锁的那个线程突然释放锁了,acquireQueued 可以抛出 false 表示当前取到锁了,无需入队

/**
* 获取锁
* @param arg
*/
public final void acquire(int arg) {
/*
1. 注意看这里把 tryAcquire 放在第一位,当他返回 true 表示获取锁成功,就无需执行后面的判断语句(创建队列,创建 Node元素,排队,线程中断)
2. 当他返回 false 说明锁被占用了,就需要执行后面的判断语句(创建队列,创建 Node元素,排队,线程中断)
*/
if (!tryAcquire(arg)
&& acquireQueued(addWaiter(), arg)) {
// 中断这个线程
Thread.currentThread().interrupt();
}
}

/**
* 判断传入的 Node 是否需要排队,如果要排队,则开始排队,如果不需要则直接获取锁
*
* 这里之所以使用 boolean 返回类型的原因就是因为可能在刚插入队列这会线程就把锁释放了,无需等待
* @param addWaiter
* @param arg
* @return 如果排队成功(线程开始等待)返回 true,如果返回 false 表示就在丢进队列这会,线程把锁释放了,无需等待
*/
private boolean acquireQueued(Node addWaiter, int arg) {
return false;
}

/**
* 根据当前线程创建一个 Node 对象,并将其入队
* @return
*/
private Node addWaiter() {
return null;
}

/**
* 尝试获取锁
* @param arg
* @return 如果获取到锁返回 true,否则返回 false
*/
private boolean tryAcquire(int arg) {
return false;
}

tryAcquire 尝试获取锁

/**
* 尝试获取锁
*
* @param arg
* @return 如果获取到锁返回 true,否则返回 false
*/
private boolean tryAcquire(int arg) {
// 获取当前的线程对象
Thread thread = Thread.currentThread();
// 获取当前锁的状态
int status = getStatus();

if (status == 0) { //当前锁处于释放状态
// 还需要判断自己是否需要排队,如果不需要排队就直接获取锁(毕竟不能插队)
// 注意这个判断,如果第一个方法返回 true,后面那个方法就不用执行了(因为是 && 连接的)
if (!hasQueuedPredecessors() && compareAndSetState(0, arg)) {
// 获取锁成功,设置占用锁的线程为当前线程
setExclusiveOwnerThread(thread);
System.out.println(thread.getName() + ":获取锁成功!");
return true;
}
} else if (thread == getExclusiveOwnerThread()) { // 判断当前尝试获取锁的线程是否就是占有锁的线程(重入锁)
status = getStatus() + arg;
setStatus(status);
return true;
}
return false;
}

/**
* 获取锁 因为这里可能会有多个线程同时操作,所以需要使用原子操作完成状态的修改
* @param expected 预期对象
* @param arg 需要修改为的状态
* @return 获取锁,成功返回 true
*/
private boolean compareAndSetState(int expected, int arg) {
return unsafe.compareAndSwapInt(this, statusOffset, expected, arg);
}

/**
* @return 判断是否需要排队
*/
private boolean hasQueuedPredecessors() {
/*
1. 队列为空,队列只有一个元素,不需要排队
2. 队列正在初始化,排队
3. 队列有元素,当前线程正好是 head元素的下一个元素,不需要排队
*/
Node h = head; /* 头元素 */
Node t = tail; /* 头元素 */
Node s; /* 头元素的下一个元素 */


/*
可能会有如下几种情况
(1)h == t
1. h = null, t = null 队列还没创建
2. h = t 队列中只有一个元素。尝试获取锁
(2)h != t
1. h != null, t = null 队列真正初始化
2. h 1= null, t != null 虽然有队列了,但是当前线程还不在队列里面
*/
return h != t && // 这里的判断使用的是 && 连接,表示出现一个 false 就返回 false
// h.next == null 表示还在初始化
// s.thread == Thread.currentThread() 表示头元素的下一个元素就是当前线程
((s = h.next) == null || s.thread != Thread.currentThread()); // 这里的判断使用的是 || 连接,表示出现一个 true 就返回 true
}

addWaiter 元素入队

addWaiter 方法主要做如下几步

  1. 使用当前线程创建一个新的 Node 对象
  2. 判断尾元素是否存在,如果存在就将当前元素加入队列,并且将当前元素修改为尾元素(原子操作 Unsafe)
  3. 如果尾元素不存在则说明队列本身就不存在,但是又没有获取到锁,说明目前有一个线程正在占用锁,没有其他线程等待
    1. 创建首元素,然后将当前新元素连接在首元素后面
    2. 设置首元素也需要原子操作(Unsafe)
/**
* 根据当前线程创建一个 Node 对象,并将其入队
*
* @return 返回当前入队的 Node
*/
private Node addWaiter() {
// 根据当前的线程创建一个 Node
Node node = new Node(Thread.currentThread());
// 取出队尾元素
Node prev = this.tail; // 新元素的上一个元素就是队尾元素
if (prev != null) {
// 队尾元素存在
node.prev = prev;
// 替换队尾元素(原子操作)
if(compareAndSetTail(prev,node)) {
prev.next = node;
return node;
}
}

// 执行这个方法,说明入队失败(1、队列不存在 2、队列正在初始化,还没有队尾元素)
return enq(node);
}

/**
* 自旋的完成入队操作
* 如果是一个线程来创建队列,则
* 循环第一次:创建了队头元素
* 循环第二次:创建了第二个元素,第二个元素就是新元素,那么新元素就会入队
* @param node 需要插入队里面的元素
* @return 返回 node
*/
private Node enq(Node node) {
while (true) {
// 获取队尾元素
Node t = this.tail;
if (t == null) { // 说明队列还没有初始化 或者 队列正在初始化(可能有别的线程在初始化)
// 如果队列还不存在需要创建新的队列,先放队头元素

// 因为可能有其他的线程也在初始化这个队列,所以这里需要使用原子操作
if(compareAndSetHead(new Node())) // 创建一个新的节点(里面三个属性都是 null)
this.tail = this.head; // 如果创建成功,让 head 和 tail 指向同一个元素
} else {
// 说明上面已经初始化了(队里初始只有一个空元素),这时直接把 node 入队就行了
// 这里的 t 经过第二轮自旋,已经实际指向 tail 了,compareAndSetTail 主要就是把 tail 的位置换成 node(注意看偏移量)
// 而 t 则是指向上面的那个 new Node() 所以,这个 t 就是 head
if (compareAndSetTail(t, node)) {
node.prev = t;
t.next = node;
return node;
}
}
}
}

/**
* 原子操作,创建队头元素
* @param node 创建的队头元素
* @return 是否创建成功
*/
private boolean compareAndSetHead(Node node) {
// 队头必须是 null 才能执行,所以这里期望值传入 null
return unsafe.compareAndSwapObject(this, headOffset, null, node);
}

/**
* 原子操作,实现队尾元素的替换
* @param prev 上个元素
* @param node 需要替换的元素
* @return 是否替换成功
*/
private boolean compareAndSetTail(Node prev, Node node) {
return unsafe.compareAndSwapObject(this, tailOffset, prev, node);
}

acquireQueued 入队等候

acquireQueued 方法主要实现,入队后等待取得锁

  1. 执行到这个方法,说明 Node 肯定已经入队了,但是未必是等候状态,这个方法就是让线程等待或者获取锁
  2. 判断当前线程的上一个元素是否是 head 元素,如果是,就尝试获取锁,获取成功就把当前元素设置设置为 head元素,并返回 true
  3. 如果上一个元素不是 head 元素,或者获取锁失败就判断当前元素是否需要等待,如果需要则等待,如果不需要就自旋

这里需要实现两个方法:shouldParkAfterFailedAcquire(node)parkAndCheckInterrupt(node)

注意这个 parkAndCheckInterrupt 方法,就是这个方法让线程停下来的,它一停就会导致调用它的 acquireQueued 停住,然后调用 acquireQueued 的 acquire 方法也停住,所以这个方法就是让线程等待的核心

/**
* 判断传入的 Node 是否需要排队,如果要排队,则开始排队,如果不需要则直接获取锁
* <p>
* 这里之所以使用 boolean 返回类型的原因就是因为可能在刚插入队列这会线程就把锁释放了,无需等待
*
* @param node addWaiter 入队的那个 Node
* @param arg
* @return 如果排队成功(线程开始等待)返回 true,如果返回 false 表示就在丢进队列这会,线程把锁释放了,无需等待
*/
private boolean acquireQueued(Node node, int arg) {
// 自旋操作
while (true) {
// 判断上一个元素是否是 head 元素
if (node.prev == head && tryAcquire(arg)) {
// 如果上个元素是 head 元素,则尝试获取锁,如果获取成功
setHead(node); // 将当前元素设置为头元素,之前的头元素就被排除在外(被 GC 回收了)
// 修改当前 node 的状态(原子操作)
compareAndSetNodeStatus(node, node.threadStatus, Node.DEFAULT);
return false; // 最终出口
}
// 判断是否需要排队等待,这里用 && 连接,表示如果需要排队,则执行后面的 parkAndCheckInterrupt 方法中断线程
if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt(node)) {
// 一定不会返回 true,因为 parkAndCheckInterrupt 一定返回 false(具体原因看注释)
return true; // 写上只是为了占位(idea的警告好烦)
}
}
}

/**
* 让当前线程排队(中断线程)
*
* @param node 当前 node
* @return 让线程等待成功返回 true,否则 false
*/
private boolean parkAndCheckInterrupt(Node node) {
// 将当前的线程状态修改为等待状态
compareAndSetNodeStatus(node, node.threadStatus, Node.WAIT);
// 中断线程
unsafe.park(false, 0);
// 这里返回的一定是 false,因为上面调用 park 已经让线程中断,所以这个 isInterrupted 方法已经执行不了,如果能执行就表示这个线程从中断恢复了
return Thread.currentThread().isInterrupted();
}

/**
* 判断当前线程是否需要排队等待
* 如果上一个元素是头元素,是不需要等待(不过前面已经判断了,能进这里表示一定不是头元素,所以这里就不判断了)
* 如果上一个元素本身都是等待状态,表示当前线程也要排队等待
* 如果上一个元素不是头元素,也不是等待状态,也没有获取锁(说明上一个线程是马上就要拿到锁的状态),当前线程排队等待
*
* @param node 当前 node
* @return 返回当前线程是否需要排队
*/
private boolean shouldParkAfterFailedAcquire(Node node) {
// 首先获取上一个元素
Node prev = node.prev;
// 上个元素不为空,不为头,且没有被取消,这时表示上个 node 要么在排队,要么在尝试获取锁,所以当前 node 一定是等待
if (prev != null && prev != head && prev.threadStatus != Node.CANCELLED) {
return true;
}
// 判断上一个元素是否被取消,如果被取消则将上一个元素从当前的队列排除(因为可能有多个被取消的,所以这里丢进 do while 循环)
if (prev != null && prev.threadStatus == Node.CANCELLED) {
do {
prev = prev.prev;
node.prev = prev;
} while (prev.threadStatus == Node.CANCELLED);
prev.next = node;
}
return false;
}

/**
* 原子操作修改 Node的状态
*
* @param node 需要修改的 node
* @param threadStatus 期望的 threadStatus
* @param arg 修改为的状态
*/
private void compareAndSetNodeStatus(Node node, int threadStatus, int arg) {
unsafe.compareAndSwapInt(node, threadStatusOffset, threadStatus, arg);
}

/**
* 头节点都是空的,因为线程已经拿到锁了,所以没必要还在队列里
* @param node 传入节点
*/
private void setHead(Node node) {
this.head = node;
node.thread = null;
node.prev = null;
}

release 释放锁

release 方法用于释放锁,并唤醒下一个线程

  1. 尝试释放锁 tryRelease()
  2. 如果释放成功则唤醒下一个线程 unparkSuccessor()
/**
* 释放锁
*
* @param arg 理论上传入的参数都是 1
*/
public void release(int arg) {
if (tryRelease(arg)) { // 如果尝试释放成功,则唤醒下一个线程
// 唤醒下一个线程
unparkSuccessor(head);
}
}

/**
* 唤醒 head 的下一个线程
*
* @param head 头
*/
private void unparkSuccessor(Node head) {
// 修改当前锁的状态为 0(释放锁)
compareAndSetState(getStatus(), 0);
// 获取 head 元素的下一个元素
Node h = this.head;
// 下一个元素是
Node next = null; // 这是要唤醒的元素
if (h != null) { // 健壮性判断
next = h.next;
}

// 如果下一个元素是 null,或者下一个元素的状态不是 wait,则从列表的最后一个元素开始往前找
// 找到最后一个状态不是 CANCELLED(取消)的作为 next
if (next == null || next.threadStatus != Node.WAIT) {
for (Node t = tail; t != null && t != h; t = t.prev) {
if (t.threadStatus != Node.CANCELLED) {
next = t;
}
}
}
// 如果 next 元素找到了,则直接换新
if (next != null) {
System.out.println("唤醒了线程:" + Thread.currentThread().getName());
// 唤醒线程
unsafe.unpark(next.thread);
// 修改状态
compareAndSetNodeStatus(next,next.threadStatus,Node.DEFAULT);
}
}

/**
* 释放锁
*
* @param arg 一般传入的都是 1
* @return 是否释放成功
*/
private boolean tryRelease(int arg) {
// 计算状态
int c = getStatus() - arg; // 1
// 判断释放锁的线程是否是当前占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new RuntimeException("释放锁的线程并非当前占用锁的线程");
}
if (c == 0) {
// 是要释放锁,先将当前持有锁的线程对象置空
setExclusiveOwnerThread(null);
System.out.println(Thread.currentThread().getName() + "释放了锁");
return true;
}
// 将新的状态设置回去
setStatus(c);
return false;
}

封装一个自己的锁

上面的 AQS 只是锁的 “内核” 要使用锁还需要一层皮,为了简化,这里就只实现两个方法 lock 和 unlock

public class MySimpleLock implements Lock {

MySimpleAQS mySimpleAQS = new MySimpleAQS();

@Override
public void lock() {
mySimpleAQS.acquire(1);
}

@Override
public void unlock() {
mySimpleAQS.release(1);
}

/* .... */
}

测试

public class Temp {
static class Storage {
Lock lock = new MySimpleLock();
int resource = 0;

public void add() {
lock.lock();
try {
resource++;
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
Storage storage = new Storage();
CountDownLatch countDownLatch = new CountDownLatch(100);

for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
storage.add();
}
countDownLatch.countDown();
}, "线程-" + i).start();
}

countDownLatch.await();
System.out.println(storage.resource);
}
}

完整的 AQS 类源码

Gist 地址

如果 Gist 打不开,可以用 Gitee 的代码片段

Reference

参考资料 AQS 对资源的共享方式